[improve][ml] Support Bookkeeper batch read#23180
[improve][ml] Support Bookkeeper batch read#23180dao-jun wants to merge 12 commits intoapache:masterfrom
Conversation
# Conflicts: # managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/cache/EntryCacheDisabled.java # managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/cache/RangeEntryCacheImpl.java
| if (!useBatchRead(entriesToRead, useBookkeeperV2WireProtocol, isStriped)) { | ||
| return handle.readAsync(firstEntry, lastEntry); | ||
| } | ||
| return handle.batchReadAsync(firstEntry, entriesToRead, 0); |
There was a problem hiding this comment.
If maxSize = 0, info log will be printed for each call, which will affect performance.
https://github.com/apache/bookkeeper/blob/7c41204506122ed6904289f4814d4130274874aa/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerHandle.java#L955-L967
private CompletableFuture<LedgerEntries> batchReadEntriesInternalAsync(long startEntry, int maxCount, long maxSize,
boolean isRecoveryRead) {
int nettyMaxFrameSizeBytes = clientCtx.getConf().nettyMaxFrameSizeBytes;
if (maxSize > nettyMaxFrameSizeBytes) {
LOG.info(
"The max size is greater than nettyMaxFrameSizeBytes, use nettyMaxFrameSizeBytes:{} to replace it.",
nettyMaxFrameSizeBytes);
maxSize = nettyMaxFrameSizeBytes;
}
if (maxSize <= 0) {
LOG.info("The max size is negative, use nettyMaxFrameSizeBytes:{} to replace it.", nettyMaxFrameSizeBytes);
maxSize = nettyMaxFrameSizeBytes;
}
There was a problem hiding this comment.
If isAutoSkipNonRecoverableData is set to true, is the batch read failure consistent with the behavior here?
There was a problem hiding this comment.
If maxSize = 0, info log will be printed for each call, which will affect performance. https://github.com/apache/bookkeeper/blob/7c41204506122ed6904289f4814d4130274874aa/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerHandle.java#L955-L967
private CompletableFuture<LedgerEntries> batchReadEntriesInternalAsync(long startEntry, int maxCount, long maxSize, boolean isRecoveryRead) { int nettyMaxFrameSizeBytes = clientCtx.getConf().nettyMaxFrameSizeBytes; if (maxSize > nettyMaxFrameSizeBytes) { LOG.info( "The max size is greater than nettyMaxFrameSizeBytes, use nettyMaxFrameSizeBytes:{} to replace it.", nettyMaxFrameSizeBytes); maxSize = nettyMaxFrameSizeBytes; } if (maxSize <= 0) { LOG.info("The max size is negative, use nettyMaxFrameSizeBytes:{} to replace it.", nettyMaxFrameSizeBytes); maxSize = nettyMaxFrameSizeBytes; }
apache/bookkeeper#4485 I created a PR to change the log level to debug
| int entriesToRead = (int) (lastEntry - firstEntry + 1); | ||
| // Batch read is not supported for striped ledgers. | ||
| LedgerMetadata m = handle.getLedgerMetadata(); | ||
| boolean isStriped = m.getEnsembleSize() != m.getWriteQuorumSize(); |
There was a problem hiding this comment.
It needs to be made clear in the document that enabling batch does not necessarily mean batch reading.
|
Related BK PR: apache/bookkeeper#4485 |
|
@dao-jun Please merge latest changes from origin/master and solve the merge conflicts. Do you have a chance to resume work on this PR? |
|
It would be great to have this feature included in Pulsar 4.2.0 |
Motivation
In BP-62, Bookkeeper starting to support batch read API,
the PR is to support Bookkeeper batch reading on the Pulsar side.
Modifications
bookkeeperEnableBatchReadto control enable BK batch read or not, the default value is false.Verifying this change
(Please pick either of the following options)
This change is a trivial rework / code cleanup without any test coverage.
(or)
This change is already covered by existing tests, such as (please describe tests).
(or)
This change added tests and can be verified as follows:
(example:)
Does this pull request potentially affect one of the following parts:
If the box was checked, please highlight the changes
Documentation
docdoc-requireddoc-not-neededdoc-completeMatching PR in forked repository
PR in forked repository: dao-jun#13